-
Notifications
You must be signed in to change notification settings - Fork 40
feat: basic table scan planning #112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Co-authored-by: Gang Wu <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
}; | ||
|
||
/// \brief Represents a task to scan a portion of a data file. | ||
class ICEBERG_EXPORT FileScanTask : public ScanTask { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some thoughts about FileScanTask
:
- Should we remove
ScanTask
abstraction above? If we remove the abstraction, we can directly use aggregate initialization to create a task. Otherwise we may need to expand the constructor every time a new parameter is required. - If we do (1) above, is it possible also to make it a simple struct by removing all functions (as they are all trivial accessors).
- Should we add fields (a.k.a. spec and partition_value) from Java
PartitionScanTask
to support partitioning? We can add them later but a TODO comment is desirable. - Should we combine
start
andlength
, and wrap them bystd::optional
? I believe they are not required at all times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I initially expected it to just be a struct, but since the previous comments suggested doing an abstraction, I referred to the design in iceberg-java/iceberg-python.
- Partition spec and value can be obtained from DataFile and Snapshot, and we can add these interfaces when needed for subsequent PR
- Sure, I will modify it to optional, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partition spec and value can be obtained from DataFile and Snapshot
That's a good point
src/iceberg/table_scan.cc
Outdated
data_entry.sequence_number.value_or(TableMetadata::kInitialSequenceNumber); | ||
for (auto it = sequence_index.lower_bound(data_sequence_number); | ||
it != sequence_index.end(); ++it) { | ||
// Additional filtering logic here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the additional filtering logic? Did you mean to further check if the delete files can be filtered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataFiles only need to retain DeleteFiles with a sequence greater than their own?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This differs per equality and positional deletes. I think there is a pretty good overview here: https://iceberg.apache.org/spec/#scan-planning
src/iceberg/table_scan.cc
Outdated
return sizeInBytes; | ||
} | ||
|
||
int32_t FileScanTask::files_count() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we need to rename it to FilesCount()
. @lidavidm suggestion?
}; | ||
|
||
/// \brief Represents a task to scan a portion of a data file. | ||
class ICEBERG_EXPORT FileScanTask : public ScanTask { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partition spec and value can be obtained from DataFile and Snapshot
That's a good point
src/iceberg/table_scan.h
Outdated
}; | ||
|
||
/// \brief A scan that reads data files and applies delete files to filter rows. | ||
class ICEBERG_EXPORT DataScan : public TableScan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little bit confused about the name of Scan and ScanTask across different implementations. Should this be DataTableScan
which produces FileScanTask
? For DataScan
, I think it should produce a group of DataTask
which contains rows of FileScanTask
.
Simply put:
Scan -> ScanTask
TableScan -> FileScanTask
DataScan -> DataTask
DataScan inherits TableScan inherits Scan
DataTask inherits FileScanTask inherits ScanTask
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I think we can constantly evolve this design because APIs can be unstable before the 1.0.0 release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At PyIceberg we (tried to) copied the Java structure, but in the end I think it is too much OOP for Python. Maybe good to start small in C++ as well. While we can change APIs until 1.0.0, I think it is important to get this one right pretty early on, since this is the main integration point for query engines.
src/iceberg/table_scan.cc
Outdated
for (auto it = sequence_index.lower_bound(data_sequence_number); | ||
it != sequence_index.end(); ++it) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this incorrect? Since it find the lowerbound and traverse all the sequence numbers above data_sequence_number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the meaning here is to find all DeleteFiles corresponding to this DataFile. Only those with a sequence number higher than the DataFile need to be read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Higher or equal for positional deletes: https://iceberg.apache.org/spec/#scan-planning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just have a comment w.r.t. the table scan name. Elsewhere LGTM.
@@ -107,4 +108,8 @@ const std::vector<SnapshotLogEntry>& Table::history() const { | |||
|
|||
const std::shared_ptr<FileIO>& Table::io() const { return io_; } | |||
|
|||
std::unique_ptr<TableScanBuilder> Table::NewScan() const { | |||
return std::make_unique<TableScanBuilder>(metadata_, io_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about passing in the Table
instead? That has all the metadata, and also the io
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I pass "Table" to the "TableScanBuilder," I cannot pass it further to "DataTableScan," as "Table" can only be passed by reference to the "TableScanBuilder."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko There are some (outdated) comments on this: #112 (comment)
src/iceberg/table_scan.cc
Outdated
data_entry.sequence_number.value_or(TableMetadata::kInitialSequenceNumber); | ||
for (auto it = sequence_index.lower_bound(data_sequence_number); | ||
it != sequence_index.end(); ++it) { | ||
// Additional filtering logic here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This differs per equality and positional deletes. I think there is a pretty good overview here: https://iceberg.apache.org/spec/#scan-planning
src/iceberg/table_scan.cc
Outdated
for (auto it = sequence_index.lower_bound(data_sequence_number); | ||
it != sequence_index.end(); ++it) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Higher or equal for positional deletes: https://iceberg.apache.org/spec/#scan-planning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just reviewed it for another pass and added some nits (we can improve them in followup PRs). Thanks for working on it!
FileScanTask::FileScanTask(std::shared_ptr<DataFile> file) | ||
: data_file_(std::move(file)) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FileScanTask::FileScanTask(std::shared_ptr<DataFile> file) | |
: data_file_(std::move(file)) {} | |
FileScanTask::FileScanTask(std::shared_ptr<DataFile> data_file) | |
: data_file_(std::move(data_file)) {} |
nit: I think eventually we need to rename it to data_file
once we will add delete files.
|
||
TableScanBuilder& TableScanBuilder::WithColumnNames( | ||
std::vector<std::string> column_names) { | ||
column_names_ = std::move(column_names); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make sure context_.projected_schema
is not set.
} | ||
|
||
TableScanBuilder& TableScanBuilder::WithProjectedSchema(std::shared_ptr<Schema> schema) { | ||
context_.projected_schema = std::move(schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make sure column_names_
is not set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a conflict situation, it's not very convenient to throw an exception here, I want to put the check in the build.
return InvalidArgument("No snapshot ID specified for table {}", | ||
table_metadata->table_uuid); | ||
} | ||
auto iter = std::ranges::find_if( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add Result<std::shared_ptr<Snapshot>> TableMetadata::Snapshot(int64_t snapshot_id) const
and move the logic below to it.
} | ||
|
||
const auto& schemas = table_metadata->schemas; | ||
const auto it = std::ranges::find_if(schemas, [id = *schema_id](const auto& schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, we can add TableMetadata::Schema(int64_t schema_id)
for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for working on this. I'm glad we've reached a consensus. Let's move forward!
Introducing basic scan table data interface